心温まるSlackの投稿を抽出するためにサーバーレスなデータ分析基盤を構築しよう!!
CX事業本部@大阪の岩田です。
クラスメソッドでは社内標準のチャットツールとしてSlackを活用していますが、「分報」という形でSlackを活用しているメンバーも数多くいます。「分報」って何?という方は以下のリンクをご確認下さい。
- 社内にSlack上での分報を導入しないかと提案してみた
- Slackで簡単に「日報」ならぬ「分報」をチームで実現する3ステップ〜Problemが10分で解決するチャットを作ろう
- Slackで「分報」を導入したらめっちゃ作業効率があがった
人気の分報ともなると参加者が50人を超え、本人のいないところで好き勝手に雑談が繰り広げられていたりします。このレベルになると人気が高いのか、それとも単に治安が悪いだけなのかよく分からなくなってきます。
「俺の分報がこんなに治安が悪いわけがない!Comprehendで証明してみた」ブログはよ
?!!!
ということでSlackの投稿をComprehendの感情分析にかけつつ分析するための基盤をサーバーレスで構築してみました。
構成
こんな環境を作ります。
1日1回Lambdを起動し、指定されたチャンネルの1日分の投稿を収集し、各投稿に対してComprehendの感情分析結果をマージしてS3に保存。保存したデータをAthenaから分析します。
以前SlackのイベントでLTさせてもらった時に作った環境にComprehendの感情分析を追加した構成です。良ければ以下の資料もご覧ください。
環境構築手順
それでは早速環境を構築していきましょう。プロジェクトのディレクトリ構造は以下のような形になります。
. ├── layer │ └── python ├── src │ └── app.py └── template.yaml
ソースコード
まずはLambdaのソースコードです。エラーハンドリングだったり大量データの考慮を無視した手抜き実装なので、業務利用を検討される場合は適宜修正をお願いします。Python3.7で実装しています。
from datetime import datetime, timedelta, timezone import os import logging import json import time import boto3 from slack.web.client import WebClient from slack.errors import SlackApiError SLACK_CHANNEL_ID = os.environ['SLACK_CHANNEL_ID'] S3_BUCKET = os.environ['S3_BUCKET'] SLACK_TOKEN = os.environ['SLACK_TOKEN'] COMPREHEND_BATCH_SIZE = 25 s3 = boto3.resource('s3') comprehend = boto3.client('comprehend') logger = logging.getLogger() def handler(event, context): try: print('start lambda') collect_messages() print('finish lambda') except Exception as e: logger.error(e) raise def collect_messages(interval_days=1): JST = timezone(timedelta(hours=+9), 'JST') today = datetime.now(JST).replace(hour=0, minute=0, second=0, microsecond=0) target_date = today - timedelta(days=interval_days) unix_target_date_start = int(target_date.timestamp()) unix_target_date_end = int((target_date + timedelta(days=1)).timestamp()) -1 slack_clinet = WebClient(SLACK_TOKEN) latest = unix_target_date_end channels_histories = [] while True: res = slack_clinet.channels_history( channel=SLACK_CHANNEL_ID, latest=latest,inclusive='false', oldest=unix_target_date_start) # 添付ファイルのみの投稿などをフィルタ ※空文字列をcomprehendに渡すとエラーになる messages = [msg for msg in res.data['messages'] if msg['text'] != ''] # comprehendのバッチサイズに合わせて分割 splited_messages = [messages[i:i + COMPREHEND_BATCH_SIZE] for i in range(0, len(messages), COMPREHEND_BATCH_SIZE)] for msgs in splited_messages: comprehend_res = comprehend.batch_detect_sentiment( TextList=[msg['text'] for msg in msgs], LanguageCode='ja' ) # comprehendの分析結果をマージ sentiments = comprehend_res['ResultList'] for m, c in zip(list(msgs), [{'Comprehend': sentiment } for sentiment in sentiments]): channels_histories.append(json.dumps({ **m, **c }, ensure_ascii=False)) # channels.historyのレスポンスに続きがなくなるまでループ if res.data['has_more'] == False: break print('channels.history has more history continue collect messages...') latest = int(float(res.data['messages'][-1]['ts'])) if len(channels_histories) == 0: print('no messages... skip upload file') return print('start upload s3') obj_key = target_date.strftime('messages/year=%Y/month=%m/day=%d') + '/slack_messages.json' obj = s3.Object(S3_BUCKET, obj_key) body = '\n'.join(channels_histories) obj.put(Body=body.encode('utf-8'))
SAMテンプレート
続いてSAMテンプレートです。利用するS3バケットやGlueのクローラ等などを作成します。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > Collect and analyze Slack messages Globals: Function: Timeout: 300 Parameters: SlackChannelId: Type: String SlackToken: NoEcho: true Type: String Resources: ScrapeMessages: Type: AWS::Serverless::Function Properties: CodeUri: src/ Handler: app.handler Runtime: python3.7 MemorySize: 512 Role: !GetAtt LambdaExecuteRole.Arn Environment: Variables: SLACK_CHANNEL_ID: !Ref SlackChannelId SLACK_TOKEN: !Ref SlackToken S3_BUCKET: !Ref S3Bucket Events: Cron: Type: Schedule Properties: Schedule: cron(0 16 * * ? *) Description: Example schedule Enabled: True Layers: - !Ref SlackLibLayer SlackLibLayer: Type: AWS::Serverless::LayerVersion Properties: Description: slack libraly layer ContentUri: layer LambdaExecuteRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - 'lambda.amazonaws.com' Action: sts:AssumeRole Policies: - PolicyName: "allow_cloud_watch_logs" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: "*" - PolicyName: "allow_s3_access" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - 's3:GetObject' - 's3:PutObject' Resource: - !Sub "arn:aws:s3:::${S3Bucket}/*" - PolicyName: "allow_comprehend" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - 'comprehend:BatchDetectSentiment' Resource: - "*" GlueCrawlerRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - 'glue.amazonaws.com' Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AmazonS3FullAccess Policies: - PolicyName: "allow_cloud_watch_logs" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: "*" S3Bucket: Type: AWS::S3::Bucket DeletionPolicy: Delete SlackDB: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Description: "Slack Messages DB" SlackCrawler: Type: AWS::Glue::Crawler Properties: Configuration: | {"Version":1.0,"CrawlerOutput":{"Partitions":{"AddOrUpdateBehavior":"InheritFromTable"},"Tables":{"AddOrUpdateBehavior":"MergeNewColumns"}}} DatabaseName: !Ref SlackDB Role: !Ref GlueCrawlerRole Schedule: ScheduleExpression: "cron(30 16 * * ? *)" SchemaChangePolicy: UpdateBehavior: "UPDATE_IN_DATABASE" DeleteBehavior: "DEPRECATE_IN_DATABASE" Targets: S3Targets: - Path: !Sub "s3://${S3Bucket}/messages" ScrapeMessagesLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub /aws/lambda/${ScrapeMessages} RetentionInDays: 14
デプロイ
デプロイします。今回の構成ではSlackのライブラリをLayerにパッケージングして利用しているので、まずはLayerの準備をします。
pip install slackclient -t layer/python/
レイヤーの準備ができたのでパッケージングしてデプロイします。
$ sam package --s3-bucket <適当なS3バケット> --template-file template.yaml --output-template-file output.yml $ sam deploy --stack-name <スタック名> --template-file output.yml --capabilities CAPABILITY_IAM --parame ter-overrides SlackToken=<Slackのトークン> SlackChannelId=<分析対象のSlackチャンネルID>
Lambda&クローラの実行
デプロイできたらLambdaを実行して投稿を収集した後Glueのクローラを実行してテーブルを作成しましょう。
$ aws lambda invoke --function-name <作成されたLambdaFunctionの名前> - $ aws glue start-crawler --name <作成されたクローラの名前>
過去データの収集
ここまでで一応環境構築は終了です。あとは日次処理でどんどんデータが貯まっていきます。が、日次処理で収集するデータは前日分のデータだけです。初回はある程度過去まで遡ってデータを収集したいところです。ということで、ローカル環境から以下のコードを実行して過去データを収集します。
import os os.environ['SLACK_TOKEN'] = <Slackのトークン> os.environ['SLACK_CHANNEL_ID'] = <分析対象のSlackチャンネルID> os.environ['S3_BUCKET'] = <作成されたS3バケット> from app import collect_messages for i in range(2,51): collect_messages(i)
過去データが収集できたらもう一度Glueのクローラを実行しておきましょう。※今回の分析対象にはオーバースペックなのですがパーティションを設定しているため。
Positiveな投稿TOP10を抽出
分析の準備ができたので、Athenaで分析していきます。とりあえずPositive判定された投稿TOP10でも抽出してみましょう。
SELECT comprehend.sentimentscore.positive, text FROM messages WHERE comprehend.sentiment = 'POSITIVE' ORDER BY comprehend.sentimentscore.positive DESC LIMIT 10
シンプルなSQLですね。実行してみます。
なんも問題ないです!!!むしろ最高です!!!!視線を落としたらさっきまでそこになかったうどん札が発生する体験最高でした!!!!!ありがとうございます!!!
一番ポジティブな投稿は丸亀製麺のうどん札に関する投稿でした。なんとスコア0.9996950626373291を叩き出しています。
Positiveな投稿に対するリアクションとして使われる絵文字TOP5を分析
せっかくなのでもう1パターン分析してみましょう。Slackの特徴の1つとして、書き込みに対して絵文字を使ってリアクションするという機能が挙げられます。Positve判定された書き込みに対するリアクションとして利用頻度の高い絵文字を抽出してみましょう。
リアクションのデータ構造
投稿に対するリアクションの情報はchannels.history
APIのレスポンスに以下の形式で含まれています。
... "reactions": [ { "name": "arigato", "users": [ "xxx" ], "count": 1 }, { "name": "atodeyomu", "users": [ "yyy" ], "count": 1 }, { "name": "mapicon-onsen", "users": [ "zzz" ], "count": 1 } ] ...
この情報をAthenaで分析してみます。
リアクションの配列を行に変換する
リアクションの情報は前述の通り配列になっています。そのため単純にSELECTすると配列として取得されます。例えば、このSQLを実行すると
SELECT reactions FROM messages WHERE client_msg_id = 'xxxxx'
結果は...
reactions | |
---|---|
1 | [{name=arigato, users=[UEN3NS8S1], count=1}, {name=atodeyomu, users=[UEN3NS8S1], count=1}, {name=mapicon-onsen, users=[UFE2V64CR], count=1}] |
このように生データそのままです。SQLで集計するためにはデータの構造を配列から行形式に変換したいところです。こういったケースではunnest
とCORSS JOIN
を使うことで配列を行に展開することが可能です。以下のSQLを実行してみましょう。
SELECT react.react, react.react.name, react.react.count FROM messages CROSS JOIN UNNEST(reactions) AS react(react) WHERE client_msg_id = 'xxxxx'
実行結果です。
react | name | count | |
---|---|---|---|
1 | {name=arigato, users=[UEN3NS8S1], count=1} | arigato | 1 |
2 | {name=atodeyomu, users=[UEN3NS8S1], count=1} | atodeyomu | 1 |
3 | {name=mapicon-onsen, users=[UFE2V64CR], count=1} | mapicon-onsen | 1 |
あとは普通にnameとcountで集計してやれば良さそうですね。というわけで以下のSQLを実行してみましょう。
WITH positive AS ( SELECT reactions FROM messages WHERE comprehend.sentiment = 'POSITIVE' ) SELECT react.name, SUM(react.count) FROM positive CROSS JOIN UNNEST(reactions) AS react(react) GROUP BY react.name ORDER BY SUM(react.count) DESC
さてさて結果は...
1位... kawaii
2位... omedeto
3位... aa
4位... e
5位... tensaideha
課題
さて、ここまでの作業で一見うまくデータ分析基盤が構築できたように見えます。しかしながら、この構成は1つ大きな課題を抱えています。Slackの特徴であるカスタム絵文字はあくまで :<絵文字の名前>:
という文字列でしかないので、投稿内でカスタム絵文字を利用していた場合は適切な感情分析ができないのです。例えば、以下の投稿はいかにもポジティブな投稿に見えます。
が、Comprehendの感情分析にかけると...
$ aws comprehend batch-detect-sentiment --language-code ja --text-list :ieei1::ieei2::ieei2::ieei2::ieei3::ieei3::ieei2::ieei4: { "ResultList": [ { "Index": 0, "Sentiment": "NEUTRAL", "SentimentScore": { "Positive": 0.004755615256726742, "Negative": 0.00036574306432157755, "Neutral": 0.9948752522468567, "Mixed": 3.469406692602206e-06 } } ], "ErrorList": [] }
NEUTRAL
と分類されます。自前のモデルを作れば解決できそうですが、そこまでパワーをかけなくても今回の目的は達成できそうなので、とりあえずこの課題については無視しようと思います。
今後の発展性について
AWS上にデータ分析基盤を構築する際はS3を中心に据えるのがベストプラクティスです。RedshiftやEMR、Amazon ES...データ分析に使えるサービスは多数ありますが、元データはあくまでS3に保存しておき必要に応じて別のサービスにロードする。あるいはRedshift Spectrumのような機能で別のサービスからS3上のデータを参照する構成を取ることで、色々と応用の幅が広がります。今回の例でも分析対象のデータは全てS3上に保存しており、保存先のS3バケットはデータアナリティクス事業本部のメンバーからクロスアカウントアクセスできるように設定済みです。
きっとこれから色々な分析にかけられ、分析結果が色々なツールで可視化されることでしょう。
まとめ
サーバーレスアーキテクチャを活用したデータ分析基盤構築のご紹介でした。まあ、Slackの分報を分析して喜ぶ人はごく一部だと思いますが、今回紹介した手法を少し応用すれば色々とビジネスにも活用できそうな気がします。例えばTwitterから自社製品に関するツイートを取得、Comprehendで感情分析にかけてNEGATIVEなツイートを自動抽出。NEGATIVEなツイートは人間がチェックして製品改善のヒントを得る。とかとか。
需要がありそうなら少しリファクタリングしたものをSAR(Serverless Application Repository)で公開することも考えたいなーと思います。